package upgrade

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"github.com/rancher/system-upgrade-controller/pkg/crds"
	upgradectl "github.com/rancher/system-upgrade-controller/pkg/generated/controllers/upgrade.cattle.io"
	"github.com/rancher/system-upgrade-controller/pkg/version"
	"github.com/rancher/wrangler/v3/pkg/apply"
	"github.com/rancher/wrangler/v3/pkg/crd"
	batchctl "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch"
	corectl "github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
	"github.com/rancher/wrangler/v3/pkg/leader"
	"github.com/rancher/wrangler/v3/pkg/schemes"
	"github.com/rancher/wrangler/v3/pkg/start"
	"github.com/sirupsen/logrus"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/kubernetes"
	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/record"
)

const (
	// readyDuration time to wait for CRDs to be ready.
	readyDuration = time.Minute * 1
)

var (
	ErrPlanNotReady                = errors.New("plan is not valid and resolved")
	ErrOutsideWindow               = errors.New("current time is not within configured window")
	ErrControllerNameRequired      = errors.New("controller name is required")
	ErrControllerNamespaceRequired = errors.New("controller namespace is required")
)

type Controller struct {
	Namespace string
	Name      string
	NodeName  string

	cfg *rest.Config
	kcs *kubernetes.Clientset

	clusterID   string
	leaderElect bool

	coreFactory    *corectl.Factory
	batchFactory   *batchctl.Factory
	upgradeFactory *upgradectl.Factory

	apply    apply.Apply
	recorder record.EventRecorder
}

func NewController(cfg *rest.Config, namespace, name, nodeName string, leaderElect bool, resync time.Duration) (ctl *Controller, err error) {
	if namespace == "" {
		return nil, ErrControllerNamespaceRequired
	}
	if name == "" {
		return nil, ErrControllerNameRequired
	}

	if nodeName == "" {
		nodeName, err = os.Hostname()
		if err != nil {
			return nil, err
		}
	}

	if cfg == nil {
		cfg, err = rest.InClusterConfig()
		if err != nil {
			return nil, err
		}
	}

	ctl = &Controller{
		Namespace:   namespace,
		Name:        name,
		NodeName:    nodeName,
		cfg:         cfg,
		leaderElect: leaderElect,
	}

	ctl.kcs, err = kubernetes.NewForConfig(cfg)
	if err != nil {
		return nil, err
	}
	ctl.coreFactory, err = corectl.NewFactoryFromConfigWithOptions(cfg, &corectl.FactoryOptions{
		Namespace: namespace,
		Resync:    resync,
	})
	if err != nil {
		return nil, err
	}
	ctl.batchFactory, err = batchctl.NewFactoryFromConfigWithOptions(cfg, &batchctl.FactoryOptions{
		Namespace: namespace,
		Resync:    resync,
	})
	if err != nil {
		return nil, err
	}
	ctl.upgradeFactory, err = upgradectl.NewFactoryFromConfigWithOptions(cfg, &corectl.FactoryOptions{
		Namespace: namespace,
		Resync:    resync,
	})
	if err != nil {
		return nil, err
	}
	ctl.apply, err = apply.NewForConfig(cfg)
	if err != nil {
		return nil, err
	}

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartStructuredLogging(0)
	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ctl.kcs.CoreV1().Events(metav1.NamespaceAll)})
	ctl.recorder = eventBroadcaster.NewRecorder(schemes.All, corev1.EventSource{Component: ctl.Name, Host: ctl.NodeName})

	return ctl, nil
}

func (ctl *Controller) Start(ctx context.Context, threads int) error {
	// This is consistent with events attached to the node generated by the kubelet
	// https://github.com/kubernetes/kubernetes/blob/612130dd2f4188db839ea5c2dea07a96b0ad8d1c/pkg/kubelet/kubelet.go#L479-L485
	nodeRef := &corev1.ObjectReference{
		Kind:      "Node",
		Name:      ctl.NodeName,
		UID:       types.UID(ctl.NodeName),
		Namespace: "",
	}

	// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
	systemNS, err := ctl.kcs.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{})
	if err != nil {
		return err
	}
	ctl.clusterID = fmt.Sprintf("%s", systemNS.UID)

	if err := ctl.registerCRD(ctx); err != nil {
		return err
	}

	// register our handlers
	if err := ctl.handleJobs(ctx); err != nil {
		return err
	}
	if err := ctl.handleNodes(ctx); err != nil {
		return err
	}
	if err := ctl.handlePlans(ctx); err != nil {
		return err
	}
	if err := ctl.handleSecrets(ctx); err != nil {
		return err
	}

	appName := fmt.Sprintf("%s %s (%s)", version.Program, version.Version, version.GitCommit)
	run := func(ctx context.Context) {
		if err := start.All(ctx, threads, ctl.coreFactory, ctl.batchFactory, ctl.upgradeFactory); err != nil {
			ctl.recorder.Eventf(nodeRef, corev1.EventTypeWarning, "StartFailed", "%s failed to start controllers for %s/%s: %v", appName, ctl.Namespace, ctl.Name, err)
			logrus.Panicf("Failed to start controllers: %v", err)
		}
		ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Started", "%s running as %s/%s", appName, ctl.Namespace, ctl.Name)
	}

	if ctl.leaderElect {
		ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Starting", "%s starting leader election for %s/%s", appName, ctl.Namespace, ctl.Name)
		leader.RunOrDie(ctx, ctl.Namespace, ctl.Name, ctl.kcs, run)
	} else {
		run(ctx)
	}

	return nil
}

func (ctl *Controller) registerCRD(ctx context.Context) error {
	crds, err := crds.List()
	if err != nil {
		return err
	}
	client, err := clientset.NewForConfig(ctl.cfg)
	if err != nil {
		return err
	}

	return crd.BatchCreateCRDs(ctx, client.ApiextensionsV1().CustomResourceDefinitions(), nil, readyDuration, crds)
}
